<?php

namespace app\service\alone\Connect;

use app\service\alone\BaseService;
use app\service\interfaces\MessageQueueInterface;
use Hhxsv5\SSE\Event;
use Hhxsv5\SSE\SSE;
use Hhxsv5\SSE\StopSSEException;

class SseService extends BaseService
{
    /** @var int $stepTime 消息发送间隔时间，单位秒 */
    private $stepTime = 1;

    /** @var int 一次性取出数据的最大数量 */
    private $maxNum = 10;

    /** @var MessageQueueInterface $queue 消息服务 */
    private $queue;

    /** @var string 订阅者，后续可选择订阅者发送 */
    private $subscriber;

    protected function init()
    {
        // 脚本执行最大时间，此服务无限制
        set_time_limit(0);

        // 设置Header
        $this->setHeader();

        // 订阅者
        $this->subscriber = (isField($this->data, 'subscriber') ? $this->data['subscriber'] . '-' : '') . sha1(randomNum(rand(1, 999999)));

        // 时间间隔
        if (isField($this->data, 'stepTime')) $this->stepTime = $this->data['stepTime'];
    }

    /**
     * 启动
     *
     * @param string $channel
     * @param string $event
     * @param callable $callback
     */
    public function boot(string $channel, string $event, callable $callback): void
    {
        // 订阅频道
        $this->queue->subscribe([$channel], $this->subscriber);

        // sse 连接
        (new SSE(new Event(function () use ($callback, $channel) {
            // 获取频道中的数据，如客户端被清除，或其他原因，则断开连接
            try {
                $list = $this->queue->data($channel, $this->subscriber, $this->maxNum);

                // 业务逻辑处理异常则不返回结果
                $result = $callback($list, $this->subscriber);
                if (200 != $result['code']) return false;

                return json_encode($result['data']);
            } catch (\Exception $exception) {
                throw new StopSSEException();
            }
        }, $event)))->start($this->stepTime);
    }

    /**
     * 注册队列服务
     *
     * @param MessageQueueInterface $queue
     * @return $this
     */
    public function registerQueueService(MessageQueueInterface $queue): self
    {
        $this->queue = $queue;

        return $this;
    }

    /**
     * 设置 SSE 协议的 Header
     */
    private function setHeader(): void
    {
        header('Content-Type: text/event-stream');
        header('Cache-Control: no-cache');
        header('Connection: keep-alive');
        header('X-Accel-Buffering: no');
    }
}